/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.channel.nio;

import io.netty.channel.*;
import io.netty.util.IntSupplier;
import io.netty.util.concurrent.RejectedExecutionHandler;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ReflectionUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a
 * {@link Selector} and so does the multi-plexing of these in the event loop.
 */
public final class NioEventLoop extends SingleThreadEventLoop {
  
  private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
  
  private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
  
  private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
      SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
  
  private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
  private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
  
  private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
      return selectNow();
    }
  };
  
  // Workaround for JDK NIO bug.
  //
  // See:
  // - http://bugs.sun.com/view_bug.do?bug_id=6427854
  // - https://github.com/netty/netty/issues/203
  static {
    final String key = "sun.nio.ch.bugLevel";
    final String bugLevel = SystemPropertyUtil.get(key);
    if (bugLevel == null) {
      try {
        AccessController.doPrivileged(new PrivilegedAction<Void>() {
          @Override
          public Void run() {
            System.setProperty(key, "");
            return null;
          }
        });
      } catch (final SecurityException e) {
        logger.debug("Unable to get/set System Property: " + key, e);
      }
    }
    
    int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
    if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
      selectorAutoRebuildThreshold = 0;
    }
    
    SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
    
    if (logger.isDebugEnabled()) {
      logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
      logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
    }
  }
  
  /**
   * The NIO {@link Selector}.
   */
  private Selector selector;
  // 这是 包装的 selector
  private Selector unwrappedSelector;
  // 响应的 selectedKeys
  private SelectedSelectionKeySet selectedKeys;
  
  // selector() 支持者  不同的平台 提供的provider不同
  private final SelectorProvider provider;
  
  /**
   * Boolean that controls determines if a blocked Selector.select should
   * break out of its selection process. In our case we use a timeout for
   * the select method and the select method will block for that time unless
   * waken up.
   */
  private final AtomicBoolean wakenUp = new AtomicBoolean();
  private volatile long nextWakeupTime = Long.MAX_VALUE;
  
  private final SelectStrategy selectStrategy;
  
  private volatile int ioRatio = 50;
  // 表示 cancel keys
  private int cancelledKeys;
  private boolean needsToSelectAgain;
  
  NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
               SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
               EventLoopTaskQueueFactory queueFactory) {
    super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
        rejectedExecutionHandler);
    if (selectorProvider == null) {
      throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
      throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    // todo 从这里打开  选择器
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    // 执行的策略
    selectStrategy = strategy;
  }
  
  // 创建一个 task queue
  private static Queue<Runnable> newTaskQueue(
      EventLoopTaskQueueFactory queueFactory) {
    if (queueFactory == null) {
      return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
    }
    // 任务队列
    return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
  }
  
  private static final class SelectorTuple {
    final Selector unwrappedSelector;
    final Selector selector;
    
    SelectorTuple(Selector unwrappedSelector) {
      this.unwrappedSelector = unwrappedSelector;
      this.selector = unwrappedSelector;
    }
    
    SelectorTuple(Selector unwrappedSelector, Selector selector) {
      this.unwrappedSelector = unwrappedSelector;
      this.selector = selector;
    }
  }
  
  private SelectorTuple openSelector() {
    final Selector unwrappedSelector;
    try {
      unwrappedSelector = provider.openSelector();
    } catch (IOException e) {
      throw new ChannelException("failed to open a new selector", e);
    }
    
    if (DISABLE_KEY_SET_OPTIMIZATION) {
      return new SelectorTuple(unwrappedSelector);
    }
    
    Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
      @Override
      public Object run() {
        try {
          return Class.forName(
              "sun.nio.ch.SelectorImpl",
              false,
              PlatformDependent.getSystemClassLoader());
        } catch (Throwable cause) {
          return cause;
        }
      }
    });
    
    if (!(maybeSelectorImplClass instanceof Class) ||
        // ensure the current selector implementation is what we can instrument.
        !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
      if (maybeSelectorImplClass instanceof Throwable) {
        Throwable t = (Throwable) maybeSelectorImplClass;
        logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
      }
      return new SelectorTuple(unwrappedSelector);
    }
    
    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    
    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
      @Override
      public Object run() {
        try {
          Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
          Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
          
          if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
            // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
            // This allows us to also do this in Java9+ without any extra flags.
            long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
            long publicSelectedKeysFieldOffset =
                PlatformDependent.objectFieldOffset(publicSelectedKeysField);
            
            if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
              PlatformDependent.putObject(
                  unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
              PlatformDependent.putObject(
                  unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
              return null;
            }
            // We could not retrieve the offset, lets try reflection as last-resort.
          }
          
          Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
          if (cause != null) {
            return cause;
          }
          cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
          if (cause != null) {
            return cause;
          }
          
          selectedKeysField.set(unwrappedSelector, selectedKeySet);
          publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
          return null;
        } catch (NoSuchFieldException e) {
          return e;
        } catch (IllegalAccessException e) {
          return e;
        }
      }
    });
    
    if (maybeException instanceof Exception) {
      selectedKeys = null;
      Exception e = (Exception) maybeException;
      logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
      return new SelectorTuple(unwrappedSelector);
    }
    selectedKeys = selectedKeySet;
    logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
    return new SelectorTuple(unwrappedSelector,
        new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
  }
  
  /**
   * Returns the {@link SelectorProvider} used by this {@link NioEventLoop} to obtain the {@link Selector}.
   */
  public SelectorProvider selectorProvider() {
    return provider;
  }
  
  @Override
  protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
    return newTaskQueue0(maxPendingTasks);
  }
  
  private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
    // This event loop never calls takeTask()
    return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
        : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
  }
  
  /**
   * Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
   * of this event loop.  Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
   * be executed by this event loop when the {@link SelectableChannel} is ready.
   */
  public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
    if (ch == null) {
      throw new NullPointerException("ch");
    }
    if (interestOps == 0) {
      throw new IllegalArgumentException("interestOps must be non-zero.");
    }
    if ((interestOps & ~ch.validOps()) != 0) {
      throw new IllegalArgumentException(
          "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
    }
    if (task == null) {
      throw new NullPointerException("task");
    }
    
    if (isShutdown()) {
      throw new IllegalStateException("event loop shut down");
    }
    
    if (inEventLoop()) {
      register0(ch, interestOps, task);
    } else {
      try {
        // Offload to the EventLoop as otherwise java.nio.channels.spi.AbstractSelectableChannel.register
        // may block for a long time while trying to obtain an internal lock that may be hold while selecting.
        submit(new Runnable() {
          @Override
          public void run() {
            register0(ch, interestOps, task);
          }
        }).sync();
      } catch (InterruptedException ignore) {
        // Even if interrupted we did schedule it so just mark the Thread as interrupted.
        Thread.currentThread().interrupt();
      }
    }
  }
  
  private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
    try {
      ch.register(unwrappedSelector, interestOps, task);
    } catch (Exception e) {
      throw new EventLoopException("failed to register a channel", e);
    }
  }
  
  /**
   * Returns the percentage of the desired amount of time spent for I/O in the event loop.
   */
  public int getIoRatio() {
    return ioRatio;
  }
  
  /**
   * Sets the percentage of the desired amount of time spent for I/O in the event loop. Value range from 1-100.
   * The default value is {@code 50}, which means the event loop will try to spend the same amount of time for I/O
   * as for non-I/O tasks. The lower the number the more time can be spent on non-I/O tasks. If value set to
   * {@code 100}, this feature will be disabled and event loop will not attempt to balance I/O and non-I/O tasks.
   */
  public void setIoRatio(int ioRatio) {
    if (ioRatio <= 0 || ioRatio > 100) {
      throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
    }
    this.ioRatio = ioRatio;
  }
  
  /**
   * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
   * around the infamous epoll 100% CPU bug.
   */
  public void rebuildSelector() {
    if (!inEventLoop()) {
      execute(new Runnable() {
        @Override
        public void run() {
          rebuildSelector0();
        }
      });
      return;
    }
    rebuildSelector0();
  }
  
  @Override
  public int registeredChannels() {
    return selector.keys().size() - cancelledKeys;
  }
  
  private void rebuildSelector0() {
    final Selector oldSelector = selector;
    final SelectorTuple newSelectorTuple;
    
    if (oldSelector == null) {
      return;
    }
    
    try {
      newSelectorTuple = openSelector();
    } catch (Exception e) {
      logger.warn("Failed to create a new Selector.", e);
      return;
    }
    
    // Register all channels to the new Selector.
    int nChannels = 0;
    for (SelectionKey key : oldSelector.keys()) {
      Object a = key.attachment();
      try {
        if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
          continue;
        }
        
        int interestOps = key.interestOps();
        key.cancel();
        SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
        if (a instanceof AbstractNioChannel) {
          // Update SelectionKey
          ((AbstractNioChannel) a).selectionKey = newKey;
        }
        nChannels++;
      } catch (Exception e) {
        logger.warn("Failed to re-register a Channel to the new Selector.", e);
        if (a instanceof AbstractNioChannel) {
          AbstractNioChannel ch = (AbstractNioChannel) a;
          ch.unsafe().close(ch.unsafe().voidPromise());
        } else {
          @SuppressWarnings("unchecked")
          NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
          invokeChannelUnregistered(task, key, e);
        }
      }
    }
    
    selector = newSelectorTuple.selector;
    unwrappedSelector = newSelectorTuple.unwrappedSelector;
    
    try {
      // time to close the old selector as everything else is registered to the new one
      oldSelector.close();
    } catch (Throwable t) {
      if (logger.isWarnEnabled()) {
        logger.warn("Failed to close the old Selector.", t);
      }
    }
    
    if (logger.isInfoEnabled()) {
      logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
    }
  }
  
  @Override
  //死循环监听、处理事件
  protected void run() {
    for (; ; ) {
      try {
        try {
          // switch 策略工厂
          switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
            case SelectStrategy.CONTINUE:
              continue;
            
            case SelectStrategy.BUSY_WAIT:
              // fall-through to SELECT since the busy-wait is not supported with NIO
            
            case SelectStrategy.SELECT:
              select(wakenUp.getAndSet(false));
              
              // 'wakenUp.compareAndSet(false, true)' is always evaluated
              // before calling 'selector.wakeup()' to reduce the wake-up
              // overhead. (Selector.wakeup() is an expensive operation.)
              //
              // However, there is a race condition in this approach.
              // The race condition is triggered when 'wakenUp' is set to
              // true too early.
              //
              // 'wakenUp' is set to true too early if:
              // 1) Selector is waken up between 'wakenUp.set(false)' and
              //    'selector.select(...)'. (BAD)
              // 2) Selector is waken up between 'selector.select(...)' and
              //    'if (wakenUp.get()) { ... }'. (OK)
              //
              // In the first case, 'wakenUp' is set to true and the
              // following 'selector.select(...)' will wake up immediately.
              // Until 'wakenUp' is set to false again in the next round,
              // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
              // any attempt to wake up the Selector will fail, too, causing
              // the following 'selector.select(...)' call to block
              // unnecessarily.
              //
              // To fix this problem, we wake up the selector again if wakenUp
              // is true immediately after selector.select(...).
              // It is inefficient in that it wakes up the selector for both
              // the first case (BAD - wake-up required) and the second case
              // (OK - no wake-up required).
              
              if (wakenUp.get()) {
                selector.wakeup();
              }
              // fall through
            default:
          }
        } catch (IOException e) {
          // If we receive an IOException here its because the Selector is messed up. Let's rebuild
          // the selector and retry. https://github.com/netty/netty/issues/8566
          rebuildSelector0();
          handleLoopException(e);
          continue;
        }
        
        cancelledKeys = 0;
        needsToSelectAgain = false;
        final int ioRatio = this.ioRatio;
        if (ioRatio == 100) {
          try {
            processSelectedKeys();
          } finally {
            // Ensure we always run tasks.
            runAllTasks();
          }
        } else {
          final long ioStartTime = System.nanoTime();
          try {
            processSelectedKeys();
          } finally {
            // Ensure we always run tasks.
            final long ioTime = System.nanoTime() - ioStartTime;
            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
          }
        }
      } catch (Throwable t) {
        handleLoopException(t);
      }
      // Always handle shutdown even if the loop processing threw an exception.
      try {
        if (isShuttingDown()) {
          closeAll();
          if (confirmShutdown()) {
            return;
          }
        }
      } catch (Throwable t) {
        handleLoopException(t);
      }
    }
  }
  
  private static void handleLoopException(Throwable t) {
    logger.warn("Unexpected exception in the selector loop.", t);
    
    // Prevent possible consecutive immediate failures that lead to
    // excessive CPU consumption.
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      // Ignore.
    }
  }
  
  private void processSelectedKeys() {
    if (selectedKeys != null) {
      //不用JDK的selector.selectedKeys(), 性能更好（1%-2%），垃圾回收更少
      processSelectedKeysOptimized();
    } else {
      //JDK的selector.selectedKeys()
      processSelectedKeysPlain(selector.selectedKeys());
    }
  }
  
  @Override
  protected void cleanup() {
    try {
      selector.close();
    } catch (IOException e) {
      logger.warn("Failed to close a selector.", e);
    }
  }
  
  void cancel(SelectionKey key) {
    //没有特殊情况（配置so linger），下面这个cancel: 实际没有“执行”，因为在关闭channel的时候执行过了。
    key.cancel();
    cancelledKeys++;
    //下面是优化：当处理一批事件时，发现很多连接都断了（默认256），
    // 这个时候后面的事件可能都失效了，所以不妨select again下。
    if (cancelledKeys >= CLEANUP_INTERVAL) {
      cancelledKeys = 0;
      needsToSelectAgain = true;
    }
  }
  
  @Override
  protected Runnable pollTask() {
    Runnable task = super.pollTask();
    if (needsToSelectAgain) {
      selectAgain();
    }
    return task;
  }
  
  private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    // check if the set is empty and if so just return to not create garbage by
    // creating a new Iterator every time even if there is nothing to process.
    // See https://github.com/netty/netty/issues/597
    if (selectedKeys.isEmpty()) {
      return;
    }
    
    Iterator<SelectionKey> i = selectedKeys.iterator();
    for (; ; ) {
      final SelectionKey k = i.next();
      final Object a = k.attachment();
      //移除selection key : 上次调用next（）结果
      i.remove();
      
      if (a instanceof AbstractNioChannel) {
        processSelectedKey(k, (AbstractNioChannel) a);
      } else {
        @SuppressWarnings("unchecked")
        NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
        processSelectedKey(k, task);
      }
      
      if (!i.hasNext()) {
        break;
      }
      
      if (needsToSelectAgain) {
        selectAgain();
        selectedKeys = selector.selectedKeys();
        
        // Create the iterator again to avoid ConcurrentModificationException
        if (selectedKeys.isEmpty()) {
          break;
        } else {
          i = selectedKeys.iterator();
        }
      }
    }
  }
  
  private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
      final SelectionKey k = selectedKeys.keys[i];
      // See https://github.com/netty/netty/issues/2363
      selectedKeys.keys[i] = null;//清空数组元素，在通道关闭后对其进行GC
      
      //AbstractNioChannel中的doRegister()方法中
      //selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
      final Object a = k.attachment();
      //NioSocketChannel
      if (a instanceof AbstractNioChannel) {
        processSelectedKey(k, (AbstractNioChannel) a);
      } else {
        @SuppressWarnings("unchecked")
        NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
        processSelectedKey(k, task);
      }
      
      if (needsToSelectAgain) {
        // null out entries in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
        selectedKeys.reset(i + 1);
        
        selectAgain();
        i = -1;
      }
    }
  }
  
  private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
      final EventLoop eventLoop;
      try {
        eventLoop = ch.eventLoop();
      } catch (Throwable ignored) {
        // If the channel implementation throws an exception because there is no event loop, we ignore this
        // because we are only trying to determine if ch is registered to this event loop and thus has authority
        // to close ch.
        return;
      }
      // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
      // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
      // still healthy and should not be closed.
      // See https://github.com/netty/netty/issues/5125
      if (eventLoop != this || eventLoop == null) {
        return;
      }
      // close the channel if the key is not valid anymore
      unsafe.close(unsafe.voidPromise());
      return;
    }
    
    try {
      int readyOps = k.readyOps();
      // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
      // the NIO JDK channel implementation may throw a NotYetConnectedException.
      if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
        // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
        // See https://github.com/netty/netty/issues/924
        int ops = k.interestOps();
        ops &= ~SelectionKey.OP_CONNECT;
        k.interestOps(ops);
        
        unsafe.finishConnect();
      }
      
      // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
      if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
        ch.unsafe().forceFlush();
      }
      
      // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
      // to a spin loop
      // todo 处理读请求（断开连接）或接入连接
      if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
      }
    } catch (CancelledKeyException ignored) {
      unsafe.close(unsafe.voidPromise());
    }
  }
  
  private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
    int state = 0;
    try {
      task.channelReady(k.channel(), k);
      state = 1;
    } catch (Exception e) {
      k.cancel();
      invokeChannelUnregistered(task, k, e);
      state = 2;
    } finally {
      switch (state) {
        case 0:
          k.cancel();
          invokeChannelUnregistered(task, k, null);
          break;
        case 1:
          if (!k.isValid()) { // Cancelled by channelReady()
            invokeChannelUnregistered(task, k, null);
          }
          break;
      }
    }
  }
  
  private void closeAll() {
    selectAgain(); //这里的目标是为了去除canceled的key
    Set<SelectionKey> keys = selector.keys();
    Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
    for (SelectionKey k : keys) {
      Object a = k.attachment();
      if (a instanceof AbstractNioChannel) {
        channels.add((AbstractNioChannel) a);
      } else {
        k.cancel();
        @SuppressWarnings("unchecked")
        NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
        invokeChannelUnregistered(task, k, null);
      }
    }
    
    for (AbstractNioChannel ch : channels) {
      ch.unsafe().close(ch.unsafe().voidPromise());
    }
  }
  
  private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause) {
    try {
      task.channelUnregistered(k.channel(), cause);
    } catch (Exception e) {
      logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e);
    }
  }
  
  @Override
  protected void wakeup(boolean inEventLoop) {
    if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
      selector.wakeup();
    }
  }
  
  @Override
  protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
    return deadlineNanos < nextWakeupTime;
  }
  
  @Override
  protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
    return deadlineNanos < nextWakeupTime;
  }
  
  Selector unwrappedSelector() {
    return unwrappedSelector;
  }
  
  int selectNow() throws IOException {
    try {
      return selector.selectNow();
    } finally {
      // restore wakeup state if needed
      if (wakenUp.get()) {
        selector.wakeup();
      }
    }
  }
  
  /**
   * 核心思想：没有task要做时，select阻塞1s，如果有task,wakeup去做。
   *
   * @param oldWakenUp
   * @throws IOException
   */
  private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
      int selectCnt = 0;
      long currentTimeNanos = System.nanoTime();
      //按scheduled的task时间来计算select timeout时间。
      long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
      
      long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
      if (nextWakeupTime != normalizedDeadlineNanos) {
        nextWakeupTime = normalizedDeadlineNanos;
      }
      
      for (; ; ) {
        long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
        if (timeoutMillis <= 0) { //已经有定时task需要执行了，或者超过最长等待时间了
          if (selectCnt == 0) {
            //非阻塞，没有数据返回0
            selector.selectNow();
            selectCnt = 1;
          }
          break;
        }
        
        // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
        // Selector#wakeup. So we need to check task queue again before executing select operation.
        // If we don't, the task might be pended until select operation was timed out.
        // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
        if (hasTasks() && wakenUp.compareAndSet(false, true)) {
          selector.selectNow();
          selectCnt = 1;
          break;
        }
        //下面select阻塞中，别人唤醒也可以可以的
        int selectedKeys = selector.select(timeoutMillis);
        selectCnt++;
        
        if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
          // - Selected something,
          // - waken up by user, or
          // - the task queue has a pending task.
          // - a scheduled task is ready for processing
          break;
        }
        if (Thread.interrupted()) {
          // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
          // As this is most likely a bug in the handler of the user or it's client library we will
          // also log it.
          //
          // See https://github.com/netty/netty/issues/2426
          if (logger.isDebugEnabled()) {
            logger.debug("Selector.select() returned prematurely because " +
                "Thread.currentThread().interrupt() was called. Use " +
                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
          }
          selectCnt = 1;
          break;
        }
        
        long time = System.nanoTime();
        if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
          // timeoutMillis elapsed without anything selected.
          selectCnt = 1;
        } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
          // The code exists in an extra method to ensure the method is not too big to inline as this
          // branch is not very likely to get hit very frequently.
          selector = selectRebuildSelector(selectCnt);
          selectCnt = 1;
          break;
        }
        
        currentTimeNanos = time;
      }
      
      if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
        if (logger.isDebugEnabled()) {
          logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
              selectCnt - 1, selector);
        }
      }
    } catch (CancelledKeyException e) {
      if (logger.isDebugEnabled()) {
        logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
            selector, e);
      }
      // Harmless exception - log anyway
    }
  }
  
  private Selector selectRebuildSelector(int selectCnt) throws IOException {
    // The selector returned prematurely many times in a row.
    // Rebuild the selector to work around the problem.
    logger.warn(
        "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
        selectCnt, selector);
    
    rebuildSelector();
    Selector selector = this.selector;
    
    // Select again to populate selectedKeys.
    selector.selectNow();
    return selector;
  }
  
  private void selectAgain() {
    needsToSelectAgain = false;
    try {
      selector.selectNow();
    } catch (Throwable t) {
      logger.warn("Failed to update SelectionKeys.", t);
    }
  }
}
